www.gusucode.com > VC++ P2P下载软件源代码-源码程序 > VC++ P2P下载软件源代码-源码程序\code\client\BufferedSocket.cpp
//Download by http://www.NewXing.com /* * Copyright (C) 2001-2003 Jacek Sieka, j_s@telia.com * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ #include "stdinc.h" #include "DCPlusPlus.h" #include "TimerManager.h" #include "BufferedSocket.h" #include "File.h" #include "CryptoManager.h" #define SMALL_BUFFER_SIZE 1024 // Polling is used for tasks...should be fixed... #define POLL_TIMEOUT 250 /** * Send a chunk of a file * @return True if file is finished, false if there's more data to send */ bool BufferedSocket::threadSendFile() { u_int32_t len; dcassert(file != NULL); try { if(compress) { if(comp == NULL) { comp = new ZCompressor(*file, size); } u_int32_t s = (u_int32_t)min(size, (int64_t) (BOOLSETTING(SMALL_SEND_BUFFER) ? SMALL_BUFFER_SIZE : inbufSize)); u_int32_t bytes; while(true) { { Lock l(cs); if(!tasks.empty()) return false; } if(wait(0, WAIT_READ) & WAIT_READ) return false; u_int32_t br = 0; bytes = comp->compress(inbuf, s, br); if(bytes == 0) { // Finished! delete comp; comp = NULL; dcassert(size == 0); fire(BufferedSocketListener::TRANSMIT_DONE); return true; } else { Socket::write((char*) inbuf, bytes); if(br > 0) { fire(BufferedSocketListener::BYTES_SENT, br); size -= br; dcassert(size >= 0); } } } } else { while(size > 0) { { Lock l(cs); if(!tasks.empty()) return false; } if(wait(0, WAIT_READ) & WAIT_READ) return false; dcassert(inbufSize >= SMALL_BUFFER_SIZE); u_int32_t s = (u_int32_t)min(size, (int64_t) (BOOLSETTING(SMALL_SEND_BUFFER) ? SMALL_BUFFER_SIZE : inbufSize)); if( (len = file->read(inbuf, s)) == 0) { // Premature EOF? dcdebug("BufferedSocket::threadSendFile Read returned 0!!!"); disconnect(); return true; } Socket::write((char*)inbuf, len); fire(BufferedSocketListener::BYTES_SENT, len); size -= len; } } } catch(const Exception& e) { if(comp) { delete comp; comp = NULL; } fail(e.getError()); return true; } fire(BufferedSocketListener::TRANSMIT_DONE); return true; } bool BufferedSocket::fillBuffer(char* buf, int bufLen, u_int32_t timeout /* = 0 */) throw(SocketException) { dcassert(buf != NULL); dcassert(bufLen > 0); int bytesIn = 0; int start = GET_TICK(); while(bytesIn < bufLen) { while(!wait(POLL_TIMEOUT, WAIT_READ)) { { Lock l(cs); if(!tasks.empty()) { // We don't want to handle tasks here... Socket::disconnect(); return false; } } if((timeout != 0) && ((start + timeout) < GET_TICK())) { // Connection timeout throw SocketException(STRING(CONNECTION_TIMEOUT)); } } dcassert(bufLen > bytesIn); int x = Socket::read(buf + bytesIn, bufLen - bytesIn); if(x <= 0) { // ??? throw SocketException(STRING(CONNECTION_CLOSED)); } bytesIn += x; } return true; } void BufferedSocket::threadConnect() { dcdebug("threadConnect()\n"); fire(BufferedSocketListener::CONNECTING); u_int32_t start = GET_TICK(); string s; short p; { Lock l(cs); s=server; p=port; } try { setBlocking(false); Socket::create(); if(SETTING(CONNECTION_TYPE) == SettingsManager::CONNECTION_SOCKS5) { if(!BOOLSETTING(SOCKS_RESOLVE)) { s = resolve(s); } Socket::connect(SETTING(SOCKS_SERVER), (short)SETTING(SOCKS_PORT)); } else { Socket::connect(s, p); } while(!wait(POLL_TIMEOUT, WAIT_CONNECT)) { { Lock l(cs); if(!tasks.empty()) { // We don't want to handle tasks here... Socket::disconnect(); return; } } if((start + 30000) < GET_TICK()) { // Connection timeout fail(STRING(CONNECTION_TIMEOUT)); return; } } // Hm, let's see if we're socksified... if(SETTING(CONNECTION_TYPE) == SettingsManager::CONNECTION_SOCKS5) { if(SETTING(SOCKS_USER).empty() && SETTING(SOCKS_PASSWORD).empty()) { // No username and pw, easier...=) char connStr[3]; connStr[0] = 5; // SOCKSv5 connStr[1] = 1; // 1 method connStr[2] = 0; // Method 0: No auth... Socket::write(connStr, 3); if(!fillBuffer(connStr, 2, 30000)) return; if(connStr[1] != 0) { fail(STRING(SOCKS_NEEDS_AUTH)); return; } } else { // We try the username and password auth type (no, we don't support gssapi) u_int8_t ulen = (u_int8_t)(SETTING(SOCKS_USER).length() & 0xff); u_int8_t plen = (u_int8_t)(SETTING(SOCKS_PASSWORD).length() & 0xff); AutoArray<u_int8_t> connStr(3 + ulen + plen); connStr[0] = 5; // SOCKSv5 connStr[1] = 1; // 1 method connStr[2] = 2; // Method 2: Name/Password... Socket::write((char*)(u_int8_t*)connStr, 3); if(!fillBuffer((char*)(u_int8_t*)connStr, 2, 30000)) return; if(connStr[1] != 2) { fail(STRING(SOCKS_AUTH_UNSUPPORTED)); return; } // Now we send the username / pw... connStr[0] = 1; connStr[1] = ulen; strncpy((char*)(u_int8_t*)connStr + 2, SETTING(SOCKS_USER).c_str(), ulen); connStr[2 + ulen] = plen; strncpy((char*)(u_int8_t*)connStr + 3 + ulen, SETTING(SOCKS_PASSWORD).c_str(), plen); Socket::write((char*)(u_int8_t*)connStr, 3 + plen + ulen); if(!fillBuffer((char*)(u_int8_t*)connStr, 2, 30000)) { return; } if(connStr[1] != 0) { fail(STRING(SOCKS_AUTH_FAILED)); return; } } // Alrite, let's get on with it... AutoArray<u_int8_t> connStr(10 + s.length()); int connLen; connStr[0] = 5; // SOCKSv5 connStr[1] = 1; // Connect connStr[2] = 0; // Reserved if(BOOLSETTING(SOCKS_RESOLVE)) { u_int8_t slen =(u_int8_t)(s.length() & 0xff); connStr[3] = 3; // Address type: domain name connStr[4] = slen; strncpy((char*)(u_int8_t*)connStr + 5, s.c_str(), slen); *((u_int16_t*)(&connStr[5 + slen])) = htons(p); connLen = 7 + slen; } else { connStr[3] = 1; // Address type: IPv4; *((long*)(&connStr[4])) = inet_addr(s.c_str()); *((u_int16_t*)(&connStr[8])) = htons(p); connLen = 10; } Socket::write((char*)(u_int8_t*)connStr, connLen); // We assume we'll get a ipv4 address back...therefore, 10 bytes...if not, things // will break, but hey...noone's perfect (and I'm tired...)... if(!fillBuffer((char*)(u_int8_t*)connStr, 10, 30000)) { return; } if(connStr[0] != 5 || connStr[1] != 0) { fail(STRING(SOCKS_FAILED)); return; } // Yihaa! } // We're connected! Clear the buffers... for(int k = 0; k < BUFFERS; k++) { outbufPos[k] = 0; } line.clear(); setBlocking(true); fire(BufferedSocketListener::CONNECTED); } catch(const SocketException& e) { if(SETTING(CONNECTION_TYPE) == SettingsManager::CONNECTION_SOCKS5) { fail("Socks5: " + e.getError()); } else { fail(e.getError()); } return; } } void BufferedSocket::threadRead() { try { int i = read(inbuf, inbufSize); if(i == -1) { // EWOULDBLOCK, no data recived... return; } else if(i == 0) { // This socket has been closed... disconnect(); return; } int bufpos = 0; string l; while(i > 0) { if(mode == MODE_LINE) { string::size_type pos; l = string((char*)inbuf + bufpos, i); if( (pos = l.find(separator)) != string::npos) { if(!line.empty()) { fire(BufferedSocketListener::LINE, line + l.substr(0, pos)); line.clear(); } else { fire(BufferedSocketListener::LINE, l.substr(0, pos)); } i-=(pos + sizeof(separator)); bufpos += (pos + sizeof(separator)); } else { line += l; i = 0; } } else if(mode == MODE_DATA) { if(dataBytes == -1) { fire(BufferedSocketListener::DATA, inbuf+bufpos, i); bufpos+=i; i = 0; } else { int high = (int)min(dataBytes, (int64_t)i); fire(BufferedSocketListener::DATA, inbuf+bufpos, high); bufpos += high; i-=high; dataBytes -= high; if(dataBytes == 0) { mode = MODE_LINE; fire(BufferedSocketListener::MODE_CHANGE, MODE_LINE); } } } } } catch(const SocketException& e) { dcdebug("BufferedSocket::threadRead caught: %s\n", e.getError().c_str()); // Ouch... fail(e.getError()); return; } } void BufferedSocket::write(const char* aBuf, int aLen) throw() { { Lock l(cs); int newSize = outbufSize[curBuf]; while(newSize < (aLen + outbufPos[curBuf])) { newSize *= 2; } if(newSize > outbufSize[curBuf]) { // Need to grow... dcdebug("Growing outbuf[%d] to %d bytes\n", curBuf, newSize); u_int8_t* tmp = new u_int8_t[newSize]; memcpy(tmp, outbuf[curBuf], outbufPos[curBuf]); delete[] outbuf[curBuf]; outbuf[curBuf] = tmp; outbufSize[curBuf] = newSize; } memcpy(outbuf[curBuf] + outbufPos[curBuf], aBuf, aLen); outbufPos[curBuf] += aLen; addTask(SEND_DATA); } } void BufferedSocket::threadSendData() { int myBuf; { Lock l(cs); myBuf = curBuf; curBuf = (curBuf + 1) % BUFFERS; } if(outbufPos[myBuf] == 0) return; Socket::write((char*)outbuf[myBuf], outbufPos[myBuf]); outbufPos[myBuf] = 0; } /** * Main task dispatcher for the buffered socket abstraction. * @todo Fix the polling... */ int BufferedSocket::run() { bool sendingFile = false; while(true) { try { while(isConnected() ? taskSem.wait(0) : taskSem.wait()) { Tasks t; { Lock l(cs); dcassert(tasks.size() > 0); t = tasks.front(); tasks.erase(tasks.begin()); } switch(t) { case SHUTDOWN: threadShutDown(); return 0; case DISCONNECT: if(isConnected()) fail(STRING(DISCONNECTED)); break; case SEND_FILE: if(isConnected()) sendingFile = true; break; case SEND_DATA: dcassert(!sendingFile); if(isConnected()) threadSendData(); break; case CONNECT: sendingFile = false; threadConnect(); break; default: dcassert("BufferedSocket::threadRun: Unknown command received" == NULL); } } // Now check if there's any activity on the socket if(isConnected()) { int waitFor = wait(POLL_TIMEOUT, sendingFile ? WAIT_READ | WAIT_WRITE : WAIT_READ); if(waitFor & WAIT_WRITE) { dcassert(sendingFile); if(threadSendFile()) sendingFile = false; } if(waitFor & WAIT_READ) { threadRead(); } } } catch(const SocketException& e) { fail(e.getError()); } } return 0; } /** * @file * $Id: BufferedSocket.cpp,v 1.51 2003/07/15 14:53:10 arnetheduck Exp $ */